iT邦幫忙

2025 iThome 鐵人賽

DAY 13
0
Software Development

Codetopia 新手日記:設計模式與原則的 30 天學習之旅系列 第 19

Day 19:Mediator(協調中心):交通總動員—終結網狀溝通地獄!

  • 分享至 

  • xImage
  •  

Codetopia 創城記 (19)|Mediator(協調中心):交通總動員—終結網狀溝通地獄!

1) 今日熱點 (故事開場 & 痛點) ⚡️

Codetopia 的週末,空氣中不只有鹹鹹的海風,還瀰漫著一股...混亂的氣息。沒錯,萬眾矚目的「海港音樂祭」即將登場!但在此之前,一場風暴已在市府內部悄悄上演。

想像一下這個畫面:

  • Rhea|道路維修隊領班,正對著對講機大喊:「A 街給我封起來!立刻!」

  • Zoe|通知平台工程師,同時在 Slack 上敲字:「B 區的改道簡訊準備發送!」

  • Owen|交通資料工程師,盯著模擬數據,眉頭緊鎖:「等等!模型顯示 A 街封閉會造成回堵,快調整參數!」

  • Felix|監控工程師,緊急插播:「回報!舞台區的臨時電力接點訊號異常,監控優先權需要提高!」

他們每個人都在為音樂祭努力,但溝通方式卻是最原始的網狀結構——私訊、臨時電話、開了又散的 ad-hoc 會議。命令互相打架,資源互相搶奪。最慘的是,出現了「你以為他撤了,但他以為你先撤了」的經典甩鍋盲點。砰!一聲令下,市府高層的耐心終於告罄:「三十分鐘內,給我拉起一個協調中心!所有人,禁止再直接摳對方!

2) 術語卡 🧭

🧭 術語卡(今日會用到)

  • GoF|Mediator (中介者):用一個中介物件來封裝一系列的物件互動。中介者使各物件不需要明確地相互引用,從而使其耦合鬆散,而且可以獨立地改變它們之間的互動。

  • EIP/EDA|Orchestrator vs ChoreographyOrchestrator (協同指揮) 像個樂團指揮,由中心節點發號施令,決定流程順序與邏輯。Choreography (事件編舞) 則像舞池裡的舞者,沒有指揮,大家各自監聽事件並做出反應。今天,我們需要的是一位指揮家!

  • MAS|Coordinator Agent + DF:在多代理系統中,由「協調者代理」向「黃頁服務 (Directory Facilitator)」查詢有哪些代理具備特定能力,然後主動發起協商,達成任務目標。

3) 笑中帶淚 (反例/壞味道) 😵

讓我們把時間倒回高層發飆前的十分鐘。那簡直是專案管理的災難片現場:Slack 上開了 47 個跟音樂祭有關的 side-channels、三條電話會議同時在線、四張互相重疊的甘特圖在螢幕上閃爍。

Rhea 剛把 A 街的封鎖線拉起來,Owen 的動態路網模型還在運算,根本來不及同步;另一邊的 Zoe 已經按下發送鍵,將「請改道 B 區」的簡訊推播給了上萬市民;Felix 此時又回報,因為電力問題,A 街的管制必須再延長兩小時...

每個人都在「微調」別人的決策,結果就是一場集體參與的「震盪迴路」。沒有人知道全局,也沒有人能下最終決定。

核心壞味道 👃:

  • 同事之間直接耦合,互相呼叫 API。
  • 不斷進行重複的對等協商,沒有最終仲裁點。
  • 缺乏一個「全球序 (Global Order)」,誰先誰後全憑運氣和聲量。

4) 王牌出手 (核心觀念/何時用/不適用) 🦸

平息這場混亂的唯一解法,就是派一位王牌交通警察登場——Mediator (協調中心)

一句話概括:把多對多的網狀互動,收斂成一對多的星狀結構。

所有局處單位(我們稱之為 Colleagues,同事們)不再彼此對話,他們只跟「協調中心」溝通。所有排程、資源仲裁、衝突解決、甚至事後補償,都由協調中心統一處理和下達。

何時用 (When to Use)

  • 互動關係複雜且易變:當一群物件之間的溝通方式錯綜複雜,像蜘蛛網一樣時。特別適用於活動、災防、突發狀況這種臨時性任務編組。

  • 需要一致的決策順序與衝突仲裁:就像這次的音樂祭,封路、通知、用電有嚴格的先後順序和資源排他性,必須有個裁判。

  • 想隱藏實作細節:每個單位只需要對外暴露「意圖」(我想封路),而不需要把自己的 API 細節(block_road(street, start_time, end_time))暴露給所有其他人。

何時不要用 (When NOT to Use)

  • 流程是線性的:如果任務只是簡單的 A→B→C 逐級處理,那用我們之前提過的 責任鏈模式 (Chain of Responsibility) 就夠了,殺雞焉用牛刀。

  • 只是想替換演算法:如果只是想在不同情境下切換不同的處理策略,用 策略模式 (Strategy) 更輕巧。

  • 僅需單向廣播:如果只是單純的一對多事件通知,且發送者不關心接收者的後續反應,用 觀察者模式 (Observer) 實現鬆耦合更佳。

  • 自治的事件舞蹈:如果各系統能透過監聽事件來自發性地完成協作,且不需要中心化的仲裁,那麼選擇 Choreography (事件編舞) 會讓系統更有彈性。

  • 處理長期業務流程:對於需要跨越數日、包含人工審批、或需要持久化的複雜工作流(例如:為期三天的完整活動規劃與資源調度),應交給更專業的 Workflow EngineSaga 模式處理。Mediator 專注於即時、短生命週期的協調。

一句話對位總結

  • Mediator: 多方協調中心

  • Observer: 單向事件廣播

  • Chain of Responsibility: 線性逐級審批

  • Strategy: 封裝可替換演算法

5) 導播切景 (表格+兩張 Mermaid) 🎬

導播,鏡頭拉一下!讓我們用三個不同焦段來看看這個「協調中心」的設計藍圖。

視角 觀念/模式 在 Codetopia 的說法
微觀 (GoF) Mediator 封裝了 Colleague 之間的互動 音樂祭調度中心 (FestivalMediator) 統一處理道路隊、通知、監控等單位的請求。
中觀 (EIP/EDA) Orchestrator 依序執行,處理補償 協同指揮官 收到「場佈」指令,依序執行:① 封路 → ② 推播通知 → ③ 開啟監控,並規劃好失敗時的補償路徑。
宏觀 (MAS) Coordinator Agent 透過 DF 查詢能力並進行協商 協調者代理 向黃頁查詢:「誰能在 T1 時段封路且可撤回?」然後發送包含時窗和補償條款的協商指令。

微觀 GoF 結構圖:

註:此處 coordinate 負責接收意圖。補償邏輯則封裝在各單位執行的 Command 物件中 (參考 Day 17),由 Mediator 依事件結果決定是否派發補償命令。

https://ithelp.ithome.com.tw/upload/images/20251003/20178500UEYi9XPCeL.png

中觀 EIP/EDA 資訊流:

https://ithelp.ithome.com.tw/upload/images/20251003/20178500CLxtFiKNeP.png

6) 最小實作 (程式碼範例) 💻

Sasha|Event Ops 協調官,臨危受命。她深吸一口氣,拋棄了原本混亂的程式碼,採用了事件驅動、命令模式與資源租約的健壯架構。

from __future__ import annotations
from abc import ABC, abstractmethod
import collections
import time
import uuid

# --- Command Pattern (as per Day 17) ---
class Command(ABC):
    @abstractmethod
    def execute(self): pass
    @abstractmethod
    def undo(self): pass

# --- Interfaces and Base Classes ---
class Mediator(ABC):
    @abstractmethod
    def coordinate(self, sender: object, intent: str, payload: dict = None) -> None: pass

class Colleague:
    def __init__(self, name: str) -> None:
        self.name = name
        self._mediator = None
    def set_mediator(self, mediator: Mediator): self._mediator = mediator
    def send_intent(self, intent: str, payload: dict = None):
        if self._mediator: self._mediator.coordinate(self, intent, payload)

# --- Concrete Colleagues ---
class RoadCrew(Colleague):
    def block_road(self, road_id: str, ctx: dict):
        print(f"🚧 {self.name}: Executing command to block road {road_id}.")
        self.send_intent("road_blocked", ctx)
    def clear_road(self, road_id: str, ctx: dict):
        print(f"✅ {self.name}: Executing UNDO to clear road {road_id}.")
        self.send_intent("road_cleared", ctx)

class NotificationService(Colleague):
    def send_alert(self, message: str, should_fail: bool, ctx: dict):
        print(f"📢 {self.name}: Executing command to send alert -> '{message}'")
        if should_fail:
            print(f"🔥 {self.name}: API call failed!")
            self.send_intent("notification_failed", ctx)
        else: self.send_intent("notification_sent", ctx)

class MonitoringService(Colleague):
    def start_monitoring(self, area: str, ctx: dict):
        print(f"📡 {self.name}: Executing command to monitor area {area}.")
        self.send_intent("monitoring_started", ctx)

# --- Concrete Commands ---
class BlockRoadCommand(Command):
    def __init__(self, receiver: RoadCrew, road_id: str, ctx: dict):
        self.receiver, self.road_id, self.ctx = receiver, road_id, ctx
    def execute(self): self.receiver.block_road(self.road_id, self.ctx)
    def undo(self): self.receiver.clear_road(self.road_id, self.ctx)

class SendAlertCommand(Command):
    def __init__(self, receiver: NotificationService, message: str, fail: bool, ctx: dict):
        self.receiver, self.message, self.fail, self.ctx = receiver, message, fail, ctx
    def execute(self): self.receiver.send_alert(self.message, self.fail, self.ctx)
    def undo(self): print(f"↩️ UNDO for SendAlert is a no-op or requires a retraction API call.")

# --- Supporting Services ---
class LeaseManager:
    def __init__(self): self._leases = {} # resource -> (holder, expiry_time)
    def acquire(self, resource: str, holder: str, ttl_seconds: int = 30) -> bool:
        now = time.time()
        if resource in self._leases and self._leases[resource][1] > now: return False
        self._leases[resource] = (holder, now + ttl_seconds); return True
    def release(self, resource: str, holder: str) -> bool:
        if self._leases.get(resource, (None, 0))[0] == holder:
            self._leases.pop(resource, None); return True
        return False
    def renew(self, resource: str, holder: str, ttl_seconds: int = 30) -> bool:
        """Extend a lease if the same holder still owns it."""
        now = time.time()
        if self._leases.get(resource, (None, 0))[0] != holder:
            return False
        self._leases[resource] = (holder, now + ttl_seconds)
        return True

# --- Concrete Mediator (Production Grade) ---
class FestivalMediator(Mediator):
    def __init__(self) -> None:
        self._colleagues = {}
        self._event_queue = collections.deque()
        self._is_processing = False
        self._seq_counter = 0
        self._command_history = collections.defaultdict(list)
        self._lease_manager = LeaseManager()
        self._log = []
        self._seen_keys = set()

    def register(self, colleague: Colleague, *capabilities: str):
        """Register a colleague under one or more capabilities and wire mediator back-reference."""
        colleague.set_mediator(self)
        for cap in capabilities:
            self._colleagues.setdefault(cap, []).append(colleague)
    def pick(self, capability: str) -> Colleague: return self._colleagues[capability][0]

    def coordinate(self, sender: object, intent: str, payload: dict = None) -> None:
        self._seq_counter += 1
        event = (self._seq_counter, sender, intent, payload or {})
        print(f"📥 Event #{event[0]} received: '{intent}'. Queued.")
        self._event_queue.append(event)
        self._process_queue()

    def _execute_command(self, command: Command, saga_id: str):
        self._command_history[saga_id].append(command)
        command.execute()

    def _compensate(self, saga_id: str):
        print(f"--- ⚠️ Initiating Compensation for Saga {saga_id} ---")
        history = self._command_history.pop(saga_id, [])
        for command in reversed(history): command.undo()

    def _process_queue(self):
        if self._is_processing: return
        self._is_processing = True
        while self._event_queue:
            seq, sender, intent, payload = self._event_queue.popleft()

            # 冪等性保護 & 決策日誌
            idempotency_key = (intent, tuple(sorted(payload.items())))
            if idempotency_key in self._seen_keys: continue
            self._seen_keys.add(idempotency_key)
            self._log.append({"seq": seq, "intent": intent, "payload": payload})
            print(f"🧠 Processing event #{seq}: '{intent}'...")

            # Accept both flat ctx ({"saga_id": ...}) and nested payload["ctx"]["saga_id"]
            saga_id = payload.get("saga_id") or payload.get("ctx", {}).get("saga_id")

            if intent == "initiate_setup":
                ctx = {"saga_id": f"setup-{uuid.uuid4().hex[:6]}", "road_id": payload["road_id"]}
                cmd = BlockRoadCommand(self.pick("road_blocker"), payload["road_id"], ctx)
                self._execute_command(cmd, ctx["saga_id"])

            elif intent == "road_blocked":
                ctx = payload
                msg = f"Road {ctx['road_id']} is closed."
                fail = payload.get("simulate_failure", False)
                cmd = SendAlertCommand(self.pick("alerter"), msg, fail, ctx)
                self._execute_command(cmd, saga_id)

            elif intent == "notification_sent":
                if self._lease_manager.acquire("temp_power", saga_id):
                    print("💡 Lease acquired for temp_power.")
                    self.pick("monitor").start_monitoring("Stage A", payload)
                else:
                    print("⏳ Power busy. Re-queuing monitoring task.")
                    # Add bounded retry counter to avoid idempotency de-dup
                    payload = dict(payload)
                    payload["attempt"] = payload.get("attempt", 0) + 1
                    if payload["attempt"] <= 3:
                        self.coordinate(self, "notification_sent", payload)  # 重試 (最多 3 次)
                    else:
                        print("🛑 Monitoring skipped after max retries due to power contention.")

            elif intent == "monitoring_started":
                print("--- ✅ Festival Setup Sequence Complete ---")
                # Release temp power lease on completion
                if saga_id:
                    self._lease_manager.release("temp_power", saga_id)
                self.coordinate(self, "setup_complete", payload)

            elif intent == "notification_failed":
                self._compensate(saga_id)
        self._is_processing = False

# --- Client Code ---
if __name__ == "__main__":
    mediator = FestivalMediator()
    mediator.register(RoadCrew("RoadCrew-1"), "road_blocker")
    mediator.register(NotificationService("Notifier-A"), "alerter")
    mediator.register(MonitoringService("Monitor-Main"), "monitor")

    print("\n--- SCENARIO 1: Successful Flow ---")
    mediator.coordinate(None, "initiate_setup", {"road_id": "A"})

    print("\n\n--- SCENARIO 2: Failure and Compensation Flow ---")
    payload = {"road_id": "B", "simulate_failure": True}
    mediator.coordinate(None, "initiate_setup", payload)

7) 鄉民出題 (動手+反模式紅旗) 🚩

動手題

  1. 強化補償邏輯:目前的 SendAlertCommandundo() 只是印出訊息。請實作一個更有意義的補償:發送一則「更正啟事」的通知。這需要 NotificationService 新增一個 send_retraction 方法。

  2. 租約續期 (Renew):在 LeaseManager 中加入 renew(resource, holder) 方法。修改 FestivalMediator,讓它在 monitoring_started 事件後,如果流程尚未結束,能為 temp_power 的租約續期,避免因長時間監控導致租約過期。

反模式紅旗 🚩

  • 🚩 協調中心變成「超級上帝類」:Mediator 應該只負責「協調」,而不是把所有業務邏輯、資料處理、執行細節全攬在身上。如果你的 Mediator 程式碼超過千行,那它可能已經變成萬能的上帝了。

  • 🚩 同事偷跑,繞過協調中心:如果在程式碼中看到某個 Colleague 物件,竟然還保留著對另一個 Colleague 的直接引用,並且呼叫了它的方法,這就破壞了 Mediator 的初衷。

  • 🚩 只有命令,沒有善後:一個好的協調中心不僅要會發號施令,更要處理失敗時的補償 (Compensation) 機制。只管殺,不管埋,是典型的不負責任 Mediator。

  • 🚩 錯把長期流程當協調:Mediator 適合處理短期的、緊密的互動。對於跨越多天、多個 session 的長期業務流程,應該交給更專業的 Saga 模式或流程引擎 (Workflow Engine)。

二選一,你會怎麼做?

突發狀況!開場前一小時,氣象局發布豪雨特報,同時,入口處人流瞬間暴增三倍。身為總指揮的你,會選擇:

A. 序列化處理:由 Mediator 立即下達一連串指令:「暫停入場 -> 推播延期通知 -> 開啟備用避雨通道 -> 重新計算人流模型」。所有動作嚴格按照順序執行。

B. 改為事件舞蹈:Mediator 發出一個「緊急狀況」事件,讓各子系統(入場、通知、通道、人流)各自監聽並獨立做出最快反應。

請留言選擇 A 或 B,並用一句話說明你的理由!

8) 測試指北 (Testing Guide) 🧪

  • 契約測試 (Contract Testing):編寫一個測試,確保任何 Colleague 的實例都沒有直接引用另一個 Colleague 的實例。可以使用 Mock 或測試替身來攔截非法呼叫,一旦發生直接呼叫就讓測試失敗。

  • 補償與順序測試 (Compensation & Order Testing):設計一個情境測試,模擬「推播服務延遲後才成功」的狀況,驗證補償邏輯 不會 被錯誤觸發。再設計一個「重試三次後才成功」的情境,驗證最終流程的正確性。

  • 資源租約測試 (Lease Testing):針對 LeaseManager 編寫單元測試,驗證租約在 TTL 後確實會過期,以及當一個持有者釋放資源後,排隊等候者能成功獲取資源。

9) 城市望遠鏡 (進階視野) 🔭

  • EIP/EDA (中觀):今天的 FestivalMediator 就是一個 Orchestrator。在更大型的系統中,Orchestrator 只負責定義流程順序與仲裁邏輯,實際的命令會透過一個共享的 Command Bus 來派發。而流程的執行狀態,則可以透過 Iterator 模式來做巡檢與回放,這與 Day 18 的精神一致。

  • MAS (宏觀):我們可以把「封路」、「推播」、「監控」這些行為,抽象化為「能力 (Capability)」。協調者代理 (Coordinator Agent) 會先向黃頁 (DF) 查詢目前有哪些代理具備這些能力,然後才根據任務需求,動態地與它們進行協商。決策邏輯可以更複雜,例如基於拓撲排序 (Topological Sort)優先級時間窗口,並允許不互相競爭資源的步驟並行執行

10) 結語 & 預告 ✨

多方別硬聊,交給協調;序與補償,一口出清。

今天的海港音樂祭總算在 Sasha 的指揮下步上軌道。但新的問題來了,臨時交管的狀態有「準備中」、「已生效」、「已解除」、「緊急暫停」等多種模式,這些狀態之間的轉換邏輯如果都寫在 if-else 裡,恐怕又是一場災難...

明日預告:Day 20|State(狀態機):是時候把臨時管制的「紅黃綠」燈號誌化,讓狀態轉移的邏輯不再四處散落!


11) 附錄:ASCII 版圖示

為了確保在不支援 Mermaid 渲染的環境中也能正常閱讀,以下提供文中圖表的 ASCII 替代版本:

GoF 結構圖 (ASCII 版)

                    ┌─────────────────┐
                    │    <<interface>> │
                    │    Mediator     │
                    │─────────────────│
                    │ +coordinate()   │
                    └─────────┬───────┘
                              │
                              ▲ implements
                              │
                    ┌─────────┴───────┐
                    │ FestivalMediator │
                    │─────────────────│
                    │ -colleagues: map │
                    │ +register()     │
                    │ +coordinate()   │
                    └─────────┬───────┘
                              │
                              │ manages
                              ▼
    ┌──────────────────────────────────────────────────────────┐
    │                                                          │
    ▼                         ▼                         ▼      │
┌─────────┐              ┌─────────────┐          ┌─────────────┐
│RoadCrew │              │Notification │          │ Monitoring  │
│         │              │  Service    │          │  Service    │
│─────────│              │─────────────│          │─────────────│
│+execute()│             │ +execute()  │          │ +execute()  │
└─────────┘              └─────────────┘          └─────────────┘
     │                         │                         │
     └─────────────────────────┼─────────────────────────┘
                               ▲
                  inherits from Colleague
                               │
                    ┌─────────────────┐
                    │   <<abstract>>   │
                    │   Colleague     │
                    │─────────────────│
                    │ -mediator       │
                    │ +send_intent()  │
                    └─────────────────┘

時序圖 (ASCII 版)

Sasha      FestivalMediator     RoadCrew    NotificationService   MonitoringService
  │               │                 │               │                    │
  │ initiate_     │                 │               │                    │
  │ setup        │                 │               │                    │
  │──────────────>│                 │               │                    │
  │               │ execute(        │               │                    │
  │               │ BlockRoad       │               │                    │
  │               │ Command)        │               │                    │
  │               │────────────────>│               │                    │
  │               │                 │ road_blocked  │                    │
  │               │<────────────────│               │                    │
  │               │ execute(        │               │                    │
  │               │ SendAlert       │               │                    │
  │               │ Command)        │               │                    │
  │               │─────────────────────────────────>│                    │
  │               │                 │ notification_ │                    │
  │               │                 │ sent          │                    │
  │               │<─────────────────────────────────│                    │
  │               │ execute(        │               │                    │
  │               │ StartMonitor    │               │                    │
  │               │ Command)        │               │                    │
  │               │───────────────────────────────────────────────────────>│
  │               │                 │               │  monitoring_      │
  │               │                 │               │  started          │
  │               │<───────────────────────────────────────────────────────│
  │ setup_        │                 │               │                    │
  │ complete      │                 │               │                    │
  │<──────────────│                 │               │                    │

網狀 vs 星狀結構對比 (ASCII 版)

【問題】網狀直接耦合:                    【解法】星狀協調中心:

    RoadCrew ←──────→ NotificationService           RoadCrew
        │   ╲      ╱   │                               │
        │    ╲    ╱    │                               │
        │     ╲  ╱     │                               ▼
        │      ╱╲      │                      ┌─────────────────┐
        │     ╱  ╲     │                      │ FestivalMediator │
        │    ╱    ╲    │                      │   協調中心      │
        ▼   ╱      ╲   ▼                      └─────────────────┘
   MonitoringService                                   ▲
                                                       │
   複雜度: O(n²)                                       │
   職責混亂、難以維護                         NotificationService
                                                       │
                                                       ▼
                                              MonitoringService

                                              複雜度: O(n)
                                              職責清晰、易於擴展

事件處理流程 (ASCII 版)

┌─────────────┐     ┌─────────────────┐     ┌──────────────────┐
│ Event Queue │────>│ Event Processor │────>│ Command History  │
│             │     │                 │     │   & Saga Log     │
│ ┌─────────┐ │     │ ┌─────────────┐ │     │                  │
│ │Event #1 │ │     │ │ Idempotency │ │     │ ┌──────────────┐ │
│ │Event #2 │ │     │ │   Check     │ │     │ │ Saga: setup- │ │
│ │Event #3 │ │     │ │             │ │     │ │   abc123     │ │
│ └─────────┘ │     │ └─────────────┘ │     │ │ - BlockRoad  │ │
│             │     │        │        │     │ │ - SendAlert  │ │
└─────────────┘     │        ▼        │     │ │ - Monitor    │ │
                    │ ┌─────────────┐ │     │ └──────────────┘ │
     失敗時觸發      │ │ Compensation│ │     │                  │
    ┌───────────────┤ │   Logic     │ │     └──────────────────┘
    │               │ └─────────────┘ │              ▲
    ▼               └─────────────────┘              │
┌─────────────┐                                     │
│ Undo Stack  │                            Command History
│             │                               Referenced
│ Clear Road  │<────────────────────────────────────┘
│ (Rollback)  │
└─────────────┘

上一篇
Day 18:逐站巡覽(Iterator):攤平宇宙的統一「游標」,終結巢狀迴圈地獄!
下一篇
Day 20:State(狀態機/號誌)—— 臨時交管的「紅黃綠」會說話
系列文
Codetopia 新手日記:設計模式與原則的 30 天學習之旅23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言